AWS Step FunctionsでLambdaを組み合わせたバッチ処理を作る
AWS re:Invent 2016で発表されたAWS Step Functionsは複数のLambda Functionを組み合わせて分散アプリケーションを作ることができるサービスです(Lambdaのほか、EC2、ECSでも可能)。
今回はバッチ処理の中でよくある一連の処理(ファイル読み込み、編集、DB登録)をStep Functionsで組み合わせてみます。
各Stepの概要
Step FunctionsではDSL(Amazon States Language)を使って状態遷移を実装します。DSLで指定できるStepは以下の通りです。
State | 説明 |
---|---|
Pass | 何もしない。inputデータをoutputへそのまま渡す |
Task | 処理の実行単位 |
Choice | 分岐 |
Wait | 開始まで待つ、日時指定可能 |
Succeed | 成功 |
Fail | 失敗 |
Pararell | 並列処理 |
バッチ処理
それでは状態遷移を実装していきましょう。以下の処理を行います。
- CSVファイルを取得してS3バケットに保存
- ファイルが存在するかどうか判定。存在すれば編集処理へ、存在しなければ終了
- 上で保存されたCSVファイルを編集して別のS3バケットに保存
- 編集したCSVをRDSに保存、同時に処理対象が存在したことを管理者に通知
これらの状態を管理するために、Task, Choice, Pass, ParallelのStepを組み合わせてDSLを書いていくと、このようになりました。
Amazon States Language
{ "Comment": "fetch csv and insert records", "StartAt": "Fetch_CSV", "States": { "Fetch_CSV": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FetchCSV", "Next": "Record_Exists?" }, "Record_Exists?": { "Type": "Choice", "Choices": [ { "Variable": "$.response_cd", "StringEquals": "1", "Next": "Modify" }, { "Variable": "$.response_cd", "StringEquals": "2", "Next": "No_File" } ], "Default": "Modify" }, "No_File": { "Type": "Pass", "End": true }, "Modify": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:ModifyCSV", "Next": "Insert" }, "Insert": { "Type": "Parallel", "Next": "Insert_Finished", "Branches": [ { "StartAt": "Insert_Records", "States": { "Insert_Records": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:InsertRecord", "End": true } } }, { "StartAt": "Notify", "States": { "Notify": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:NotifySLK", "End": true } } } ] }, "Insert_Finished": { "Type": "Pass", "End": true } } }
各タスクの説明
それぞれのタスクの説明をします。
Lambda Functionはそれぞれの処理を実装すれば良いのですが、Step Functionsで状態を管理するための実装もあるのでコードも一応載せておきます。
1.CSVの取得
CSV取得処理では、ファイルが存在するかどうかでレスポンスコードを変えています。 後続のファイル存在有無判定でこのコードを使用します。
Amazon States Language
"Fetch_CSV": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FetchCSV", "Next": "Record_Exists?" },
Lambda Function
import requests import boto3 def handler(event, context): s3 = boto3.resource('s3') response = requests.get('http://www.example.com/test.csv') # ファイルが存在しない場合はレスポンスコード:2 を返却 if not (response.status_code == 200): return {u'response_cd': u'2'} obj = s3.Bucket('test-bucket').Object('test.csv') res = obj.put( Body="".join(response.text).encode('utf-8'), ContentEncoding='utf-8', ContentType='text/plane' ) # ダウンロード処理を行った場合はレスポンスコード:1 を返却 return {u'response_cd': u'1'}
2.ファイル存在有無の判定
判定は前処理のレスポンスコードを使用しています。 Choiceタスクの中でレスポンスコードを見て、分岐させています。
Amazon States Language
"Record_Exists?": { "Type": "Choice", "Choices": [ { "Variable": "$.response_cd", "StringEquals": "1", "Next": "Modify" }, { "Variable": "$.response_cd", "StringEquals": "2", "Next": "No_File" } ], "Default": "Modify" },
3. ファイルが存在しない場合の終了処理
前の処理で処理対象のファイルが存在しなかった場合、バッチの実行を終了します。
Amazon States Language
"No_File": { "Type": "Pass", "End": true },
4.ファイル編集処理,
ここでは、取得したCSVの中から必要なデータのみを抽出して別のS3バケットに保存します。
(編集処理の中身は特に関係ありません。)
Amazon States Language
"Modify": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:ModifyCSV", "Next": "Insert" },
Lambda Function
import boto3 def handler(event, context): s3 = boto3.resource('s3') bucket = s3.Bucket('test-bucket') obj = s3.Object('test-bucket', 'test.csv') body = obj.get()['Body'].read().decode('utf-8') content = [] for i, line in enumerate(body.split('\r\n')): if i < 2: continue content.append(','.join(line.split(',')[0:4]) + "\n") obj2 = s3.Bucket('test-bucket2').Object('modify.csv') obj2.put( Body="".join(content).encode('utf-8'), ContentEncoding='utf-8', ContentType='text/plane' )
5. CSVをRDSに保存、バッチ管理者へ処理をすることの通知処理
このバッチでは処理対象が存在した場合は管理者へ通知するという前提で処理を組み立てることにします。 CSVファイルの内容をRDSに保存する処理と管理者への通知は逐次処理で行う必要もないのでParallelタスクを使って並列実行します。 (db登録処理の内容は特に関係ありません。)
Amazon States Language
"Insert": { "Type": "Parallel", "Next": "Insert_Finished", "Branches": [ { "StartAt": "Insert_Records", "States": { "Insert_Records": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:InsertRecord"\ , "End": true } } }, { "StartAt": "Notify", "States": { "Notify": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:NotifySLK", "End": true } } } ] },
Lambda Function
import boto3 import psycopg2 def handler(event, content): s3 = boto3.resource('s3') obj = s3.Object('test-bucket2', 'modify.csv') body = obj.get()['Body'].read().decode('utf-8') try: conn = psycopg2.connect( host="hostname", database="dbname", port="5432", user="username", password="password" ) cur = conn.cursor() for i, line in enumerate(body.split('\n')): if i < 1: continue if not len(line.split(',')) == 4: continue date, usd, gbp, eur = [i for i in line.split(',')] cur.execute("insert into exchange values(%s, %s, %s,%s)", [date, usd, gbp, eur]) conn.commit() cur.close() conn.close() except: print("error")
まとめ
AWS Step Functionsを使うとLambda Functionの実装で状態を持つことなく複数Lambdaを組み立てた処理を構築することができます。
それぞれのFunctionは状態を持たずに別のサービス(Step Functions)で状態を管理する仕組みは、書いていて関数型プログラミングで処理を組み立てているような印象を受けました。状態を持たないので保守性、可読性の高いコードを書くことが容易になります。
もちろんマネージドサービスを使うことによる、運用コストが下がることやスケールの容易さも魅力です。